Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refact(core): optimized batch removal of remaining indices consumed by a single consumer #2203

Merged
merged 12 commits into from
May 31, 2023

Conversation

zyxxoo
Copy link
Contributor

@zyxxoo zyxxoo commented Apr 23, 2023

No description provided.

@codecov
Copy link

codecov bot commented Apr 23, 2023

Codecov Report

Merging #2203 (d087eca) into master (f23c648) will increase coverage by 4.18%.
The diff coverage is 64.34%.

@@             Coverage Diff              @@
##             master    #2203      +/-   ##
============================================
+ Coverage     64.39%   68.58%   +4.18%     
- Complexity      678      979     +301     
============================================
  Files           497      498       +1     
  Lines         40573    40681     +108     
  Branches       5663     5681      +18     
============================================
+ Hits          26129    27902    +1773     
+ Misses        11828    10074    -1754     
- Partials       2616     2705      +89     
Impacted Files Coverage Δ
...raph/backend/store/rocksdb/RocksDBStdSessions.java 75.64% <0.00%> (+6.34%) ⬆️
...ugegraph/backend/store/raft/StoreSnapshotFile.java 41.88% <50.00%> (+41.88%) ⬆️
.../hugegraph/backend/store/rocksdb/RocksDBStore.java 73.46% <50.00%> (+30.50%) ⬆️
...a/org/apache/hugegraph/task/EphemeralJobQueue.java 62.50% <62.50%> (ø)
...he/hugegraph/backend/tx/GraphIndexTransaction.java 83.36% <71.42%> (-0.36%) ⬇️
...n/java/org/apache/hugegraph/StandardHugeGraph.java 69.15% <100.00%> (+1.48%) ⬆️
...hugegraph/backend/store/raft/rpc/RpcForwarder.java 70.96% <100.00%> (+70.96%) ⬆️
...n/java/org/apache/hugegraph/util/CompressUtil.java 87.25% <100.00%> (ø)
...hugegraph/backend/store/rocksdb/OpenedRocksDB.java 73.33% <100.00%> (ø)
...h/backend/store/rocksdbsst/RocksDBSstSessions.java 47.65% <100.00%> (ø)

... and 76 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@zyxxoo zyxxoo force-pushed the zy_dev branch 3 times, most recently from 53d99a1 to 8441b2a Compare April 23, 2023 09:58
@zyxxoo zyxxoo changed the title single task consume batch remove left index task chore: single task consume batch remove left index task Apr 23, 2023
@zyxxoo zyxxoo requested review from imbajin and javeme April 23, 2023 09:59
@zyxxoo zyxxoo changed the title chore: single task consume batch remove left index task optimized: batch removal of remaining indices consumed by a single consumer" Apr 23, 2023
@zyxxoo
Copy link
Contributor Author

zyxxoo commented Apr 23, 2023

optimize #2081 #2201

if (this.state.compareAndSet(State.INIT, State.EXECUTE)) {
try {
RemoveLeftIndexJob job = new RemoveLeftIndexJob(pendingQueue, this::consumeComplete,
this::reSchedule);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just call reSchedule() in consumeComplete()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, the reSchedule method is to start the job execution when it detects that there is data. The consumeComplete function means that the current job has been completed and the mark needs to be updated. The next time new data enters the queue, the job can be triggered

@imbajin imbajin changed the title optimized: batch removal of remaining indices consumed by a single consumer" refact(core): optimized batch removal of remaining indices consumed by a single consumer Apr 23, 2023
javeme
javeme previously approved these changes May 5, 2023
Copy link
Member

@imbajin imbajin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

review it soon:)

@javeme
Copy link
Contributor

javeme commented May 6, 2023

raft api test error:

[INFO]  T E S T S
[INFO] -------------------------------------------------------
[INFO] Running org.apache.hugegraph.api.ApiTestSuite
Error:  Tests run: 90, Failures: 1, Errors: 0, Skipped: 1, Time elapsed: 714.611 s <<< FAILURE! - in org.apache.hugegraph.api.ApiTestSuite
Error:  testTruncate(org.apache.hugegraph.api.GremlinApiTest)  Time elapsed: 1.257 s  <<< FAILURE!
java.lang.AssertionError: Response with status 500 and content {"exception":"java.lang.IllegalStateException","message":"The snapshot future can't be null","cause":"[java.lang.IllegalStateException]"} expected:<200> but was:<500>
	at org.apache.hugegraph.api.GremlinApiTest.testTruncate(GremlinApiTest.java:139)

[INFO] 
[INFO] Results:
[INFO] 
Error:  Failures: 
Error:    GremlinApiTest.testTruncate:139->BaseApiTest.assertResponseStatus:615 Response with status 500 and content {"exception":"java.lang.IllegalStateException","message":"The snapshot future can't be null","cause":"[java.lang.IllegalStateException]"} expected:<200> but was:<500>
[INFO] 
Error:  Tests run: 90, Failures: 1, Errors: 0, Skipped: 1

@zyxxoo
Copy link
Contributor Author

zyxxoo commented May 8, 2023

GremlinApiTest
public void forwardToLeader(PeerId leaderId, StoreCommand command,
                              RaftStoreClosure future) {
      E.checkNotNull(leaderId, "leader id");
      E.checkState(!leaderId.equals(this.nodeId),
                   "Invalid state: current node is the leader, there is " +
                   "no need to forward the request");
      LOG.debug("The node {} forward request to leader {}",
                this.nodeId, leaderId);

      StoreCommandRequest.Builder builder = StoreCommandRequest.newBuilder();
      builder.setType(command.type());
      builder.setAction(command.action());
      builder.setData(ZeroByteStringHelper.wrap(command.data()));
      StoreCommandRequest request = builder.build();

      RpcResponseClosure<StoreCommandResponse> responseClosure;
      responseClosure = new RpcResponseClosure<StoreCommandResponse>() {
          @Override
          public void setResponse(StoreCommandResponse response) {
              if (response.getStatus()) {
                  LOG.debug("StoreCommandResponse status ok");
                  future.complete(Status.OK(), () -> null);
              } else {
                  LOG.debug("StoreCommandResponse status error");
                  Status status = new Status(RaftError.UNKNOWN,
                                             "fowared request failed");
                  BackendException e = new BackendException(
                                       "Current node isn't leader, leader " +
                                       "is [%s], failed to forward request " +
                                       "to leader: %s",
                                       leaderId, response.getMessage());
                  future.failure(status, e);
              }
          }

          @Override
          public void run(Status status) {
              future.run(status);
          }
      };
      this.waitRpc(leaderId.getEndpoint(), request, responseClosure);
  }

the code future.complete(Status.OK(), () -> null); return a null future, Maybe be the place should put a future that complete with null value

@zyxxoo
Copy link
Contributor Author

zyxxoo commented May 8, 2023

java 11 rocksdb block
when i run ApiTestSuit on local env, the test will block on CypherApiTest#testRelationQuery;
but if i only run CypherApiTest on ApiTestSuit(Comment other classes), the running will not block or block on CypherApiTest other test method

@zyxxoo zyxxoo force-pushed the zy_dev branch 2 times, most recently from 2e76d4b to 16f6934 Compare May 8, 2023 07:29
@@ -145,7 +145,7 @@ public static void decompressTar(String sourceFile, String outputDir,
Files.createDirectories(newPath);
} else {
// check parent folder again
Path parent = newPath.getParent();
Path parent = newPath.toAbsolutePath().getParent();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we split into a separated commit?

Copy link
Contributor Author

@zyxxoo zyxxoo May 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mainly did it for convenience, putting two git commits together is simpler.

return ret;
}

private Object executeBatchJob(List<EphemeralJob<?>> jobs, Object prev) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can keep List<Job<?>>?

Copy link
Contributor Author

@zyxxoo zyxxoo May 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main purpose of the queue is to batch EphemeralJob. The name EphemeralJob is preferred and the job's parameters must be set during initialization (the SysTaskCallable defines the params method).

if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
if (queue != null) {
queue.queue().clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure why called clear() here.
and prefer to call queue.clear().

Copy link
Contributor Author

@zyxxoo zyxxoo May 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When an InterruptedException occurs, these tasks may be aborted, so clear all tasks

.job(job)
.schedule();
return task.id();
job.selfCommit(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does only RemoveLeftIndexJob class contain the selfCommit() method? if not then we can move selfCommit() call into submitEphemeralJob().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should handle everything in batches to keep it simple. Let's exclude 'selfCommit' for now.


private static final long PAGE_SIZE = Query.COMMIT_BATCH;
private static final String BATCH_EPHEMERAL_JOB = "batch-ephemeral-job";
private static final int MAX_CONSUME_COUNT = EphemeralJobQueue.CAPACITY / 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now COMMIT_BATCH is 100, CAPACITY is 1000.
should we let CAPACITY >> COMMIT_BATCH and MAX_CONSUME_COUNT ~= PAGE_SIZE?
such as let MAX_CONSUME_COUNT = COMMIT_BATCH, let CAPACITY = 100 * COMMIT_BATCH

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean remove "PAGE_SIZE" field?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I change to below formula:
CAPACITY = 100 * Query.COMMIT_BATCH;
PAGE_SIZE = Query.COMMIT_BATCH;
MAX_CONSUME_COUNT = 2 * PAGE_SIZE;

javeme
javeme previously approved these changes May 16, 2023
@@ -1780,7 +1776,6 @@ protected long removeIndexLeft(ConditionQuery query,
// Process secondary index or search index
sCount += this.processSecondaryOrSearchIndexLeft(cq, element);
}
this.tx.commit();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we don't support auto commit anymore?

Comment on lines +80 to +84
// This code forwards the request to the Raft leader and considers the operation successful
// if it's forwarded successfully. It returns a RaftClosure because the calling
// logic expects a RaftClosure result. Specifically, if the current instance is the Raft leader,
// it executes the corresponding logic locally and notifies the calling logic asynchronously
// via RaftClosure. Therefore, the result is returned as a RaftClosure here.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe better to use /* */ next time

@imbajin imbajin merged commit e1d9607 into master May 31, 2023
@imbajin imbajin deleted the zy_dev branch May 31, 2023 04:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Done
Development

Successfully merging this pull request may close these issues.

3 participants